{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Pi is roughly 3.137480\n"
     ]
    }
   ],
   "source": [
    "from __future__ import print_function\n",
    "import sys\n",
    "from random import random\n",
    "from operator import add\n",
    "\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "if __name__ == \"__main__\":\n",
    "    \"\"\"\n",
    "        Pi\n",
    "    \"\"\"\n",
    "    spark = SparkSession\\\n",
    "        .builder\\\n",
    "        .appName(\"PythonPi\")\\\n",
    "        .getOrCreate()\n",
    "\n",
    "    partitions = 1\n",
    "    n = 100000 * partitions\n",
    "\n",
    "    def f(_):\n",
    "        x = random() * 2 - 1\n",
    "        y = random() * 2 - 1\n",
    "        return 1 if x ** 2 + y ** 2 <= 1 else 0\n",
    "\n",
    "    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)\n",
    "    print(\"Pi is roughly %f\" % (4.0 * count / n))\n",
    "\n",
    "#     spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running ALS with M=100, U=500, F=10, iters=5, partitions=2\n",
      "\n",
      "Iteration 0:\n",
      "\n",
      "RMSE: 0.2229\n",
      "\n",
      "Iteration 1:\n",
      "\n",
      "RMSE: 0.0731\n",
      "\n",
      "Iteration 2:\n",
      "\n",
      "RMSE: 0.0317\n",
      "\n",
      "Iteration 3:\n",
      "\n",
      "RMSE: 0.0315\n",
      "\n",
      "Iteration 4:\n",
      "\n",
      "RMSE: 0.0315\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "WARN: This is a naive implementation of ALS and is given as an\n",
      "      example. Please use pyspark.ml.recommendation.ALS for more\n",
      "      conventional use.\n"
     ]
    }
   ],
   "source": [
    "\"\"\"\n",
    "This is an example implementation of ALS for learning how to use Spark. Please refer to\n",
    "pyspark.ml.recommendation.ALS for more conventional use.\n",
    "This example requires numpy (http://www.numpy.org/)\n",
    "\"\"\"\n",
    "from __future__ import print_function\n",
    "import sys\n",
    "import numpy as np\n",
    "from numpy.random import rand\n",
    "from numpy import matrix\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "LAMBDA = 0.01   # regularization\n",
    "np.random.seed(42)\n",
    "\n",
    "def rmse(R, ms, us):\n",
    "    diff = R - ms * us.T\n",
    "    return np.sqrt(np.sum(np.power(diff, 2)) / (M * U))\n",
    "\n",
    "def update(i, mat, ratings):\n",
    "    uu = mat.shape[0]\n",
    "    ff = mat.shape[1]\n",
    "    XtX = mat.T * mat\n",
    "    Xty = mat.T * ratings[i, :].T\n",
    "    for j in range(ff):\n",
    "        XtX[j, j] += LAMBDA * uu\n",
    "    return np.linalg.solve(XtX, Xty)\n",
    "\n",
    "if __name__ == \"__main__\":\n",
    "\n",
    "    \"\"\"\n",
    "    Usage: als [M] [U] [F] [iterations] [partitions]\"\n",
    "    \"\"\"\n",
    "\n",
    "    print(\"\"\"WARN: This is a naive implementation of ALS and is given as an\n",
    "      example. Please use pyspark.ml.recommendation.ALS for more\n",
    "      conventional use.\"\"\", file=sys.stderr)\n",
    "\n",
    "    spark = SparkSession\\\n",
    "        .builder\\\n",
    "        .appName(\"PythonALS\")\\\n",
    "        .getOrCreate()\n",
    "\n",
    "    sc = spark.sparkContext\n",
    "\n",
    "    M = 100\n",
    "    U = 500\n",
    "    F = 10\n",
    "    ITERATIONS = 5\n",
    "    partitions = 2\n",
    "\n",
    "    print(\"Running ALS with M=%d, U=%d, F=%d, iters=%d, partitions=%d\\n\" %\n",
    "          (M, U, F, ITERATIONS, partitions))\n",
    "\n",
    "    R = matrix(rand(M, F)) * matrix(rand(U, F).T)\n",
    "    ms = matrix(rand(M, F))\n",
    "    us = matrix(rand(U, F))\n",
    "\n",
    "    Rb = sc.broadcast(R)\n",
    "    msb = sc.broadcast(ms)\n",
    "    usb = sc.broadcast(us)\n",
    "\n",
    "    for i in range(ITERATIONS):\n",
    "        ms = sc.parallelize(range(M), partitions) \\\n",
    "               .map(lambda x: update(x, usb.value, Rb.value)) \\\n",
    "               .collect()\n",
    "        # collect() returns a list, so array ends up being\n",
    "        # a 3-d array, we take the first 2 dims for the matrix\n",
    "        ms = matrix(np.array(ms)[:, :, 0])\n",
    "        msb = sc.broadcast(ms)\n",
    "\n",
    "        us = sc.parallelize(range(U), partitions) \\\n",
    "               .map(lambda x: update(x, msb.value, Rb.value.T)) \\\n",
    "               .collect()\n",
    "        us = matrix(np.array(us)[:, :, 0])\n",
    "        usb = sc.broadcast(us)\n",
    "\n",
    "        error = rmse(R, ms, us)\n",
    "        print(\"Iteration %d:\" % i)\n",
    "        print(\"\\nRMSE: %5.4f\\n\" % error)\n",
    "\n",
    "#     spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false,
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Text: [Hi, I, heard, about, Spark] => \n",
      "Vector: [0.03238218240439892,0.03422726094722748,-0.014331452548503876]\n",
      "\n",
      "Text: [I, wish, Java, could, use, case, classes] => \n",
      "Vector: [0.06346431374549866,0.004565070516296795,0.004498231091669628]\n",
      "\n",
      "Text: [Logistic, regression, models, are, neat] => \n",
      "Vector: [0.005696431919932366,0.003507903963327408,-0.008991636149585248]\n",
      "\n",
      "+--------------------+--------------------+\n",
      "|                text|              result|\n",
      "+--------------------+--------------------+\n",
      "|[Hi, I, heard, ab...|[0.03238218240439...|\n",
      "|[I, wish, Java, c...|[0.06346431374549...|\n",
      "|[Logistic, regres...|[0.00569643191993...|\n",
      "+--------------------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from __future__ import print_function\n",
    "from pyspark.ml.feature import Word2Vec\n",
    "\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark= SparkSession\\\n",
    "                .builder \\\n",
    "                .appName(\"dataFrame\") \\\n",
    "                .getOrCreate()\n",
    "\n",
    "# Input data: Each row is a bag of words from a sentence or document.\n",
    "documentDF = spark.createDataFrame([\n",
    "    (\"Hi I heard about Spark\".split(\" \"), ),\n",
    "    (\"I wish Java could use case classes\".split(\" \"), ),\n",
    "    (\"Logistic regression models are neat\".split(\" \"), )\n",
    "], [\"text\"])\n",
    "\n",
    "# Learn a mapping from words to Vectors.\n",
    "word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol=\"text\", outputCol=\"result\")\n",
    "model = word2Vec.fit(documentDF)\n",
    "\n",
    "result = model.transform(documentDF)\n",
    "for row in result.collect():\n",
    "    text, vector = row\n",
    "    print(\"Text: [%s] => \\nVector: %s\\n\" % (\", \".join(text), str(vector)))\n",
    "result.show()\n",
    "# spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0\n",
      "2\n",
      "3\n",
      "4\n",
      "5\n",
      "6\n",
      "6\n",
      "7\n",
      "9\n"
     ]
    }
   ],
   "source": [
    "from __future__ import print_function\n",
    "import sys\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "if __name__ == \"__main__\":\n",
    "\n",
    "    spark = SparkSession\\\n",
    "        .builder\\\n",
    "        .appName(\"PythonSort\")\\\n",
    "        .getOrCreate()\n",
    "\n",
    "    lines = spark.read.text('numbers.txt').rdd.map(lambda r: r[0])\n",
    "    sortedCount = lines.flatMap(lambda x: x.split()) \\\n",
    "        .map(lambda x: (int(x), 1)) \\\n",
    "        .sortByKey()\n",
    "    # This is just a demo on how to bring all the sorted data back to a single node.\n",
    "    # In reality, we wouldn't want to collect all the data to the driver node.\n",
    "    output = sortedCount.collect()\n",
    "    for (num, unitcount) in output:\n",
    "        print(num)\n",
    "\n",
    "#     spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "and: 2\n",
      "Group: 1\n",
      "Publishing: 1\n",
      "Media: 1\n",
      "is: 1\n",
      "global: 1\n",
      "one: 1\n",
      "research,: 1\n",
      "through: 1\n",
      "in: 1\n",
      "educational: 1\n",
      "Business: 1\n",
      "Nature: 2\n",
      "May: 1\n",
      "leading: 1\n",
      "2015: 1\n",
      "publishers: 1\n",
      "Springer: 2\n",
      "combination: 1\n",
      "Science: 1\n",
      "Palgrave: 1\n",
      "worlds: 1\n",
      "created: 1\n",
      "of: 2\n",
      "Macmillan: 2\n",
      "professional: 1\n",
      "the: 2\n",
      "Education: 1\n"
     ]
    }
   ],
   "source": [
    "from __future__ import print_function\n",
    "import sys\n",
    "from operator import add\n",
    "# SparkSession：是一个对Spark的编程入口，取代了原本的SQLContext与HiveContext，方便调用Dataset和DataFrame API\n",
    "# SparkSession可用于创建DataFrame，将DataFrame注册为表，在表上执行SQL，缓存表和读取parquet文件。\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "if __name__ == \"__main__\": \n",
    "    # appName 为 Spark 应用设定一个应用名，改名会显示在 Spark Web UI 上\n",
    "    # 假如SparkSession 已经存在就取得已存在的SparkSession，否则创建一个新的。\n",
    "    spark = SparkSession\\\n",
    "        .builder\\\n",
    "        .appName(\"PythonWordCount\")\\\n",
    "        .getOrCreate()\n",
    "        \n",
    "    # 读取传入的文件内容，并写入一个新的RDD实例lines中，此条语句所做工作有些多，不适合初学者，可以截成两条语句以便理解。\n",
    "    # map是一种转换函数，将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。\n",
    "    # 原始RDD中的数据项与新RDD中的数据项是一一对应的关系。\n",
    "    lines = spark.read.text('words.txt').rdd.map(lambda r: r[0])\n",
    "   \n",
    "    # flatMap与map类似，但每个元素输入项都可以被映射到0个或多个的输出项，最终将结果”扁平化“后输出 \n",
    "    counts = lines.flatMap(lambda x: x.split(' ')) \\\n",
    "                  .map(lambda x: (x, 1)) \\\n",
    "                  .reduceByKey(add)\n",
    "                \n",
    "    # collect() 在驱动程序中将数据集的所有元素作为数组返回。 这在返回足够小的数据子集的过滤器或其他操作之后通常是有用的。\n",
    "    # 由于collect 是将整个RDD汇聚到一台机子上，所以通常需要预估返回数据集的大小以免溢出。             \n",
    "    output = counts.collect()\n",
    "    \n",
    "    for (word, count) in output:\n",
    "        print(\"%s: %i\" % (word, count))\n",
    "        \n",
    "#     spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
